package analysis;

import beans.MarketingUserBehavior;
import com.sun.xml.internal.fastinfoset.util.ValueArray;
import org.apache.flink.streaming.api.functions.source.SourceFunction;


import java.util.Arrays;
import java.util.List;
import java.util.Random;


/**
 * @author zkq
 * @date 2022/10/5 23:39
 */
public class SimulatedMarketingBehaviorSource implements SourceFunction<MarketingUserBehavior> {
    //运行的标识位
    boolean running = true;
    //定义用户行为和渠道的集合
    List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
    List<String> channelList = Arrays.asList("app store", "weibo", "wechat", "tieba");
    Random random = new Random();
    //@Override
    public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
        while (running){
            Long id = random.nextLong();
            String behavior = behaviorList.get(random.nextInt(behaviorList.size()));
            String channel = channelList.get(random.nextInt(channelList.size()));
            Long timestamp = System.currentTimeMillis();
            ctx.collect(new MarketingUserBehavior(id, behavior, channel, timestamp));
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}
